package com.matrix.async.core;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

public class Comsumer extends WorkThread {
	private Logger log = Logger.getLogger(WorkThread.class);
	private final APPQueue taskQueue;

	private AtomicInteger threadNumber = new AtomicInteger(1);

	public Comsumer(APPQueue taskQueue) {
		this.taskQueue = taskQueue;
	}

	public Comsumer(String taskType, APPQueue taskQueue, ThreadFactory threadFactory) {
		this.name = "Consumer [" + taskType + "]-" + threadNumber.getAndIncrement();
		this.taskType = taskType;
		this.taskQueue = taskQueue;
		this.thread = (threadFactory != null ? threadFactory.newThread(name, this)
				: WorkThread.defaultThreadFactory().newThread(name, this));
		log.info("消费者被创建=" + this.name);

	}

	private void beforeExecute(Task task) {
		if (task != null) {
			task.beforeExecute();
		}
	}

	private void retry(Task task) {
		if (task != null && task.retry()) {
			taskQueue.put(task);
		}
	}

	private void afterExecute(Task task) {
		if (task != null) {
			task.afterExecute();
		}
	}

	private void onError(Task task, Exception e) {
		if (task != null) {
			task.onError(e);
		}
	}

	/**
	 * 获取执行任务
	 * 
	 * @return
	 */
	protected Task getTask() {
		int state = runState;
		if (state != RUNNING) {
			log.info("线程非执行状态");
			return null;
		}
		for (;;) {
			try {
				Task task = (Task) taskQueue.take();
				log.info("获取到一个任务=" + task.getID());
				return task;
			} catch (InterruptedException e) {
				log.error(":" + e.getMessage(), e);
			}
		}

	}

	protected void runTask(Task task) {

		final ReentrantLock runLock = this.runLock;
		runLock.lock();
		try {
			log.info("开始执行任务" +  task.getID());
			boolean ran = false;
			beforeExecute(task);
			try {
				if (task.execute()) {
					afterExecute(task);
					log.info(" 任务执行完成=" +task.getID());
				} else {
					log.info(" 执行失败进行重试=" +task.getID());
					retry(task);
					log.error(task.toString());
				}
				ran = true;
			} catch (Exception e) {
				log.error(":" + e.getMessage(), e);
				if (!ran) {
					onError(task, e);
				}
			}
		} finally {
			runLock.unlock();
		}

	}

	protected void work() {
		Task task = getTask();
		if (task != null) {
			if (task instanceof TaskPackage) {
				task.beforeExecute();
				List<Task> tasks = ((TaskPackage) task).getAllTask();
				for (Task task2 : tasks) {
					runTask(task2);
				}
				task.afterExecute();
			} else {
				runTask(task);
			}
		}
	}

}
